package com.daoshu.mti.kafkaclient.controller;

import com.daoshu.mti.kafkaclient.constant.Topic;
import com.daoshu.mti.kafkaclient.producer.KafKaConsumerProducer;
import com.daoshu.mti.kafkaclient.service.SendMsgService;
import com.daoshu.mti.kafkaclient.service.TKmTldpService;
import com.daoshu.mti.kafkaclient.thread.ConsumerMsgThread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.*;

/**
 * @author liBai
 * @Classname SendMsgController
 * @Description 发送消息接口类
 * @Date 2019-05-14 10:11
 */
@Slf4j
@RestController
public class SendMsgController {
    @Autowired
    private KafKaConsumerProducer producer;
    @Autowired
    private KafkaTemplate kafkaTemplate;
    @Resource
    private SendMsgService sendMsgService;
    @Autowired
    private ConsumerMsgThread consumerMsgThread1;
    private ConsumerMsgThread consumerMsgThread2;
    @Autowired
    private TKmTldpService tKmTldpService;
    /***
     * 发送消息体为基本类型的消息
     */
    @GetMapping("/sendSimple")
    public String sendSimple() {
        String data = "{\"adress\":\"\",\"border_type\":\"0\",\"cert_no\":\"E07506617\",\"cert_type\":\"PP\",\"city\":\"\",\"cki_chg\":\"\",\"cmt\":\"\",\"ct_dt\":\"20201107 18:52:00\",\"datasource\":\"1\",\"delete_flag\":\"0\",\"delete_flt_id\":\"\",\"dupcode\":\"\",\"edi\":\"\",\"ffqno\":\"\",\"filename\":\"DEST_20201107184648.txt.Z\",\"flight_no\":\"\",\"flt_airlcode\":\"CZ\",\"flt_date\":\"2020-11-07\",\"flt_number\":\"3997\",\"flt_suffix\":\"\",\"icao_code\":\"\",\"import_time\":null,\"jmp\":\"\",\"paxclass\":\"\",\"pdt_bir_adress\":\"\",\"pdt_birthday\":\"\",\"pdt_country\":\"\",\"pdt_dept\":\"\",\"pdt_dest\":\"\",\"pdt_exprirydate\":\"\",\"pdt_firstname\":\"\",\"pdt_issue_country\":\"\",\"pdt_issuedate\":\"\",\"pdt_lastname\":\"\",\"pdt_midname\":\"\",\"pg_gxsj\":\"2020-11-07 18:53:56.588341\",\"pg_rksj\":\"2020-11-07 18:53:56.588341\",\"pnr_ref\":\"\",\"postcode\":\"\",\"province\":\"\",\"psr_acc\":\"\",\"psr_aec\":\"\",\"psr_agent\":\"86219\",\"psr_arrive_time\":\"2020-11-07 18:42:36\",\"psr_arst\":\"\",\"psr_asoboi\":\"\",\"psr_avih\":\"\",\"psr_baggage\":\"\",\"psr_bags\":\"\",\"psr_bagwgt\":\"\",\"psr_blnd\":\"\",\"psr_brdno\":\"79\",\"psr_bsct\":\"\",\"psr_chnname\":\"\",\"psr_citycanc\":\"\",\"psr_ckipid\":\"68297\",\"psr_ckitime\":\"2020-11-07 15:11:00\",\"psr_class\":\"R\",\"psr_crs\":\"JPXKE9\",\"psr_ctca\":\"\",\"psr_deaf\":\"\",\"psr_dep\":\"\",\"psr_edi_dept\":\"\",\"psr_edi_dest\":\"\",\"psr_edii\":\"\",\"psr_edio\":\"\",\"psr_edi_warn\":\"\",\"psr_efmn\":\"7842432312456\",\"psr_emig\":\"\",\"psr_exstseat\":\"\",\"psr_fffr\":\"\",\"psr_foid_nonet\":\"\",\"psr_gender\":\"\",\"psr_group\":\"\",\"psr_hbprbg\":\"\",\"psr_hostnbr\":\"210\",\"psr_ics\":\"PFN909\",\"psr_inad\":\"\",\"psr_inbound\":\"\",\"psr_inf\":\"\",\"psr_infname\":\"\",\"psr_msg\":\"\",\"psr_name\":\"ZHANGCHAOLI\",\"psr_notify\":\"\",\"psr_office\":\"PKX099\",\"psr_ofl\":\"\",\"psr_osr\":\"\",\"psr_outbound\":\"\",\"psr_pctc\":\"\",\"psr_petc\":\"\",\"psr_pil\":\"\",\"psr_psm\":\"\",\"psr_pspt\":\"\",\"psr_rea\":\"\",\"psr_rffi\":\"\",\"psr_rst\":\"\",\"psr_rtfi\":\"\",\"psr_rush\":\"\",\"psr_sbyno\":\"\",\"psr_sea\":\"\",\"psr_seatnbr\":\"31K\",\"psr_seg_seatnbr\":\"\",\"psr_sip\":\"\",\"psr_spe\":\"\",\"psr_spml\":\"\",\"psr_spml_id\":\"\",\"psr_status\":\"AC\",\"psr_stdrs\":\"\",\"psr_tsi\":\"\",\"psr_type\":\"\",\"psr_udgrade\":\"\",\"psr_unattach\":\"\",\"psr_vudgrade\":\"\",\"psr_wcbd\":\"\",\"psr_wcbw\":\"\",\"psr_wcmd\":\"\",\"psr_wcob\":\"\",\"psr_wtype\":\"\",\"psr_xabp\":\"\",\"psr_xasr\":\"\",\"psr_xres\":\"\",\"psr_xt\":\"\",\"rksj\":null,\"sby\":\"\",\"seg_dept_code\":\"PKX\",\"seg_dept_code_cn\":null,\"seg_deptno\":\"1\",\"seg_dest_code\":\"KMG\",\"seg_dest_code_cn\":\"长水国际机场\",\"seg_destno\":\"2\",\"seg_fltid\":\"94779429\",\"sn_id\":\"10525183246\",\"special_bg\":\"\",\"special_seat\":\"\",\"sta_arvetm\":\"2020-11-07 23:00:00\",\"sta_depttm\":\"2020-11-07 19:20:00\",\"stcr\":\"\",\"svc\":\"\",\"telno\":\"SZX237-05NOV20-1222-T SZX/SZX/T0755-82179797/SHENZHEN ZHONGYUA-N AVIATION SERVIC/CTC  T SZX/ECO- LTD\",\"ures\":\"\",\"vip\":\"\",\"wl\":\"\",\"xxzjbh\":\"cd618a80-f0a0-4aeb-8c2f-9656eef98094\"}";
        producer.sendMessage(Topic.MHLG, data);
        return "success";
    }

    /***
     * 多消费者组、组中多消费者对同一主题的消费情况
     */
    @GetMapping("/sendGroup")
    public String sendGroup() {
        for (int i = 0; i < 4; i++) {
            // 第二个参数指定分区，第三个参数指定消息键 分区优先
            int i2 = i % 4 ;
            log.info("partition = {}",i2);
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(Topic.TLDP, i % 4, "key", "hello group " + i);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.info("发送消息失败,{}" + throwable.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, Object> sendResult) {
                    log.info("发送消息成功,{}", sendResult.toString());
                }
            });

        }
        return "group success";
    }

    /**
     * 获取消息
     * @return
     */
    @GetMapping("/getMsg")
    public String getMsg(){
        long startTime = System.currentTimeMillis();
        String msg = sendMsgService.getMsg2();
        long endTime = System.currentTimeMillis();
        log.info("获取任务时间：{}ms",endTime-startTime);
        return msg;
    }

    /**
     * 处理完成业务回传业务唯一标识修改状态
     * @param fuBh 全局唯一服务标识
     */
    @GetMapping("/resolve")
    public String resolve(@RequestParam("fuBh") Long fuBh){
        sendMsgService.resolve(fuBh);
        return "success";
    }

    @GetMapping("/getMsgThread")
    public String getMsgThread() throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        StringBuilder builder = new StringBuilder(10);
        for (int i = 0;i<=10;i++){
            ConsumerMsgThread thread1 = new ConsumerMsgThread(sendMsgService);
            Future future = pool.submit(thread1);
            String msg = future.get()==null?"abc":future.get().toString();
            builder.append(msg).append(";");
        }
        pool.shutdown();
        return builder.toString();

//        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
//                new ArrayBlockingQueue<Runnable>(5));
//
//        for(int i=0;i<15;i++){
//            executor.execute(consumerMsgThread);
//            System.out.println("线程池中线程数目："+executor.getPoolSize()+"，队列中等待执行的任务数目："+
//                    executor.getQueue().size()+"，已执行玩别的任务数目："+executor.getCompletedTaskCount());
//        }
//        executor.shutdown();

        //return "success";
    }
}
